{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "364cd143-d987-4fda-800e-b5dd0af97a79",
   "metadata": {},
   "outputs": [],
   "source": [
    "%session_id_prefix iceberg-sql-\n",
    "%glue_version 3.0\n",
    "%idle_timeout 60\n",
    "%connections <your-iceberg-connection-name>\n",
    "%%configure \n",
    "{\n",
    "  \"--conf\": \"spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\"\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fc76d99b-caf5-4663-bda0-1771abd8efdf",
   "metadata": {},
   "outputs": [],
   "source": [
    "catalog_name = \"glue_catalog\"\n",
    "bucket_name = \"<Your S3 bucket name>\"\n",
    "bucket_prefix = \"<Your S3 bucket prefix>\"\n",
    "database_name = \"iceberg_sql\"\n",
    "table_name = \"product\"\n",
    "warehouse_path = f\"s3://{bucket_name}/{bucket_prefix}\"\n",
    "dynamodb_table = 'myGlueLockTable'"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "223f52c4",
   "metadata": {},
   "source": [
    "## Initialize SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "07d9de08-c473-4591-bd5a-ae7106d9addc",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import SparkSession\n",
    "spark = SparkSession.builder \\\n",
    "    .config(\"spark.sql.warehouse.dir\", warehouse_path) \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}\", \"org.apache.iceberg.spark.SparkCatalog\") \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}.warehouse\", warehouse_path) \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}.catalog-impl\", \"org.apache.iceberg.aws.glue.GlueCatalog\") \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}.io-impl\", \"org.apache.iceberg.aws.s3.S3FileIO\") \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}.lock-impl\", \"org.apache.iceberg.aws.glue.DynamoLockManager\") \\\n",
    "    .config(f\"spark.sql.catalog.{catalog_name}.lock.table\", dynamodb_table) \\\n",
    "    .config(\"spark.sql.extensions\", \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\") \\\n",
    "    .getOrCreate()"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "19d274a4-1610-43a6-a574-b18894a2f73b",
   "metadata": {},
   "source": [
    "## Clean up existing resources"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "3e8254c1-b5d6-4b03-92b3-ceffd90edd79",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "DROP TABLE IF EXISTS {catalog_name}.{database_name}.{table_name}\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "5078b110-be78-43fe-8ffe-93cd013c7b0c",
   "metadata": {},
   "source": [
    "## Create Iceberg table with sample data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "fba92930-daf3-4b20-ae73-25b0b6c82d90",
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql import Row\n",
    "import time\n",
    "\n",
    "ut = time.time()\n",
    "\n",
    "product = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 250, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00002', 'product_name': 'Thermostat', 'price': 400, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00003', 'product_name': 'Television', 'price': 600, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00004', 'product_name': 'Blender', 'price': 100, 'category': 'Electronics', 'updated_at': ut},\n",
    "    {'product_id': '00005', 'product_name': 'USB charger', 'price': 50, 'category': 'Electronics', 'updated_at': ut}\n",
    "]\n",
    "\n",
    "df_products = spark.createDataFrame(Row(**x) for x in product)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "cb9bf071-a9f9-4a26-a362-f2b6250a6c82",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_products.createOrReplaceTempView(f\"tmp_{table_name}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "aa310535",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "CREATE DATABASE IF NOT EXISTS {database_name}\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a07c0a18-c73c-4393-ac79-427e4eef21c9",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "CREATE TABLE {catalog_name}.{database_name}.{table_name}\n",
    "USING iceberg\n",
    "AS SELECT * FROM tmp_{table_name}\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "faee90d6-46c8-409a-8baa-bd20aa6e444d",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "USE iceberg_sql"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "1fdcedac",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SHOW TABLES"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "48d42e63-fc96-4047-96fb-ad388f16a8bb",
   "metadata": {},
   "source": [
    "## Read from Iceberg table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "a3f30f34-b750-4b65-b002-f342efe910de",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM glue_catalog.iceberg_sql.product"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "edc9062b-d134-4e78-879e-d9c51e85fdad",
   "metadata": {},
   "source": [
    "## Upsert records into Iceberg table"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "92624355-7e93-4c65-950c-3d0ccd0110b6",
   "metadata": {},
   "outputs": [],
   "source": [
    "ut = time.time()\n",
    "\n",
    "product_updates = [\n",
    "    {'product_id': '00001', 'product_name': 'Heater', 'price': 400, 'category': 'Electronics', 'updated_at': ut}, # Update\n",
    "    {'product_id': '00006', 'product_name': 'Chair', 'price': 50, 'category': 'Furniture', 'updated_at': ut} # Insert\n",
    "]\n",
    "df_product_updates = spark.createDataFrame(Row(**x) for x in product_updates)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "06524e63-1ca4-4e91-8b48-4165e5bdca8b",
   "metadata": {},
   "outputs": [],
   "source": [
    "df_product_updates.createOrReplaceTempView(f\"tmp_{table_name}_updates\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "38a4e5b9-49b1-4b7c-9388-cb13f33c62c3",
   "metadata": {},
   "outputs": [],
   "source": [
    "query = f\"\"\"\n",
    "MERGE INTO {catalog_name}.{database_name}.{table_name} AS t\n",
    "USING (SELECT * FROM tmp_{table_name}_updates) AS u\n",
    "ON t.product_id = u.product_id\n",
    "WHEN MATCHED THEN UPDATE SET t.updated_at = u.updated_at\n",
    "WHEN NOT MATCHED THEN INSERT *\n",
    "\"\"\"\n",
    "spark.sql(query)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "d62c4865-125f-4619-9f58-d2ec4bd4b562",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM glue_catalog.iceberg_sql.product"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "9e209781",
   "metadata": {},
   "source": [
    "## Delete records"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "ea9793ea",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "DELETE FROM glue_catalog.iceberg_sql.product WHERE product_name == \"Blender\""
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "214029da",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM glue_catalog.iceberg_sql.product"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "c546f7e4",
   "metadata": {},
   "source": [
    "## View History and Snapshots"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "15a368b3",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM glue_catalog.iceberg_sql.product.history"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "83b9e822",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT * FROM glue_catalog.iceberg_sql.product.snapshots"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "82c05698",
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "SELECT h.made_current_at, s.operation, h.snapshot_id, h.is_current_ancestor, s.summary[\"spark.app.id\"] FROM glue_catalog.iceberg_sql.product.history h JOIN glue_catalog.iceberg_sql.product.snapshots s  ON h.snapshot_id = s.snapshot_id ORDER BY made_current_at"
   ]
  },
  {
   "cell_type": "markdown",
   "id": "98770309",
   "metadata": {},
   "source": [
    "## Stop Session"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "id": "53ccca8e-1692-4da6-af55-9ab52d6fbc08",
   "metadata": {},
   "outputs": [],
   "source": [
    "%stop_session"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Glue PySpark",
   "language": "python",
   "name": "glue_pyspark"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 5
}
